package chat

import java.lang.reflect.Modifier
import java.util.concurrent.ConcurrentHashMap

import akka.serialization.Serializer
import com.google.protobuf.CodedOutputStream
import com.trueaccord.scalapb.{GeneratedMessage, GeneratedMessageCompanion}
import com.typesafe.scalalogging.StrictLogging

/**
 * @author eiennohito
 * @since 15/09/28
 */
object ScalaProtobufSerializer {
}

class ScalaProtobufSerializer extends Serializer with StrictLogging {
  override def identifier = 41231541

  override def includeManifest = true

  private val cachedAccessors = new ConcurrentHashMap[Class[_], GeneratedMessageCompanion[_ <: GeneratedMessage]]()

  def factoryFor(c: Class[_]): GeneratedMessageCompanion[_ <: GeneratedMessage] = {
    val obj = cachedAccessors.get(c)
    if (obj != null) obj
    else {
      logger.debug(s"creating a deserialization factory for $c")
      val cl = c.getClassLoader
      val compClazz = cl.loadClass(c.getName + "$")
      val fld = compClazz.getField("MODULE$")
      assert(Modifier.isStatic(fld.getModifiers), "MODULE$ field was not static")
      val companion = fld.get(null).asInstanceOf[GeneratedMessageCompanion[_ <: GeneratedMessage]]
      cachedAccessors.put(c, companion)
      companion
    }
  }

  override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]) = {
    manifest match {
      case None => throw new Exception("can't deserialize something, no manifest")
      case Some(c) =>
        val factory = factoryFor(c)
        factory.parseFrom(bytes).asInstanceOf[AnyRef]
    }
  }

  private def serializeObj(m: GeneratedMessage): Array[Byte] = {
    val len = m.serializedSize
    val array = new Array[Byte](len)
    val stream = CodedOutputStream.newInstance(array)
    try {
      m.writeTo(stream)
      array
    } catch {
      case e: Exception =>
        val cls = m.getClass
        throw new ScalaProtobufException(s"${m.toString}: trying to serialize $cls, should be of size $len", e)
    }
  }

  override def toBinary(o: AnyRef) = {
    o match {
      case m: GeneratedMessage =>
        serializeObj(m)
      case _ =>
        throw new Exception(s"can't support objects that do not inherit from ${classOf[GeneratedMessage].getCanonicalName}")
    }
  }
}

class ScalaProtobufException(msg: String, inner: Exception) extends RuntimeException(msg, inner)